python操作hdfs 您所在的位置:网站首页 hdfs dfs -touch python操作hdfs

python操作hdfs

2023-11-02 12:50| 来源: 网络整理| 查看: 265

hdfs模块时python的一个第三方库,可以允许直接对hadoop的hdfs模块进行访问.

安装 安装hadoop

关于hadoop的安装配置会在另一篇文章中介绍,这里只介绍python的hdfs库的安装.

安装hdfs库

所有python的三方模块均采用pip来安装.

pip install hdfs hdfs库的使用

下面将介绍hdfs库的方法列表,并会与hadoop自带的命令行工具进行比较

注:hdfs dfs开头是hadoop自带的命令行工具命令

连接hadoop

通过http协议连接hadoop的datanode节点,默认端口50070

from hdfs.client import Client client = Client("http://127.0.0.1:50070/")

注:为了节省篇幅,下面的所有代码片段默认包含上两行,此外,后续所有的hdfs指代hadoop的hdfs模块,而非python的hdfs库

list()

list()会列出hdfs指定路径的所有文件信息,接收两个参数

hdfs_path 要列出的hdfs路径 status 默认为False,是否显示详细信息 print("hdfs中的目录为:", client.list(hdfs_path="/",status=True))

查看hdfs根目录下的文件信息,等同于hdfs dfs -ls /

status()

查看文件或者目录状态,接收两个参数

hdfs_path 要列出的hdfs路径 strict 是否开启严格模式,严格模式下目录或文件不存在不会返回None,而是raise print(client.status(hdfs_path="/b.txt",strict=True)) checksum()

checksum() 计算目录下的文件数量,只有一个参数.

print("根目录下的文件数量为:", client.checksum(hdfs_path="/input.txt")) parts()

列出路径下的part file,接收三个参数

hdfs_path 要列出的hdfs路径 parts 要显示的parts数量 默认全部显示 status 默认为False,是否显示详细信息 print("", client.parts(hdfs_path="/log", parts=0, status=True)) content()

列出目录或文件详情,接收两个参数

hdfs_path 要列出的hdfs路径 strict 是否开启严格模式,严格模式下目录或文件不存在不会返回None,而是raise print(client.content(hdfs_path="/",strict=True)) makedirs()

创建目录,同hdfs dfs -mkdir与hdfs dfs -chmod的结合体,接收两个参数

hdfs_path hdfs路径 permission 文件权限 print("创建目录", client.makedirs(hdfs_path="/t", permission="755")) rename()

文件或目录重命名,接收两个参数

hdfs_src_path 原始路径或名称 hdfs_dst_path 修改后的文件或路径 client.rename(hdfs_src_path="/d.txt",hdfs_dst_path="/d.bak.txt") resolve()

返回绝对路径,接收一个参数hdfs_path

print(client.resolve("d.txt")) set_replication()

设置文件在hdfs上的副本(datanode上)数量,接收两个参数,集群模式下的hadoop默认保存3份

hdfs_path hdfs路径 replication 副本数量 client.set_replication(hdfs_path="/b.txt",replication=2) read()

读取文件信息 类似与 hdfs dfs -cat hfds_path,参数如下:

hdfs_path hdfs路径 offset 读取位置 length 读取长度 buffer_size 设置buffer_size 不设置使用hdfs默认100MB 对于大文件 buffer够大的化 sort与shuffle都更快 encoding 指定编码 chunk_size 字节的生成器,必须和encodeing一起使用 满足chunk_size设置即 yield delimiter 设置分隔符 必须和encodeing一起设置 progress 读取进度回调函数 读取一个chunk_size回调一次 # 读取200长度 with client.read("/input.txt", length=200, encoding='utf-8') as obj: for i in obj: print(i) # 从200位置读取200长度 with client.read("/input.txt", offset=200, length=200, encoding='utf-8') as obj: for i in obj: print(i) # 设置buffer为1024,读取 with client.read("/input.txt", buffer_size=1024, encoding='utf-8') as obj: for i in obj: print(i) # 设置分隔符为换行 p = client.read("/input.txt", encoding='utf-8', delimiter='\n') with p as d: print(d, type(d), next(d)) # 设置读取每个块的大小为8 p = client.read("/input.txt", encoding='utf-8', chunk_size=8) with p as d: print(d, type(d), next(d)) download()

从hdfs下载文件到本地,参数列表如下.

hdfs_path hdfs路径 local_path 下载到的本地路径 overwrite 是否覆盖(如果有同名文件) 默认为Flase n_threads 启动线程数量,默认为1,不启用多线程 temp_dir下载过程中文件的临时路径 **kwargs其他属性 print("下载文件结果input.txt:", client.download(hdfs_path="/input.txt", local_path="~/",overwrite=True))

等同 hdfs dfs copyToLocal /input ~/

upload()

上传文件到hdfs 同hdfs dfs -copyFromLocal local_file hdfs_path,参数列表如下:

hdfs_path, hdfs上位置 local_path, 本地文件位置 n_threads=1 并行线程数量 temp_dir=None, overwrite=True或者文件已存在的情况下的临时路径 chunk_size=2 ** 16 块大小 progress=None, 报告进度的回调函数 完成一个chunk_size回调一次 chunk_size可以设置大点 如果大文件的话 cleanup=True, 上传错误时 是否删除已经上传的文件 **kwargs 上传的一些关键字 一般设置为 overwrite 来覆盖上传 def callback(filename, size): print(filename, "完成了一个chunk上传", "当前大小:", size) if size == -1: print("文件上传完成") # 上传成功返回 hdfs_path client.upload(hdfs_path="/a_bak14.txt", local_path="a.txt", chunk_size=2


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有